agentmux_srv\backend\storage\filestore/
offset_ops.rs

1// Copyright 2025-2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! FileStore offset-based read/write operations.
5
6
7use std::collections::HashMap;
8
9use rusqlite::params;
10
11use super::core::{FileStore, PART_DATA_SIZE};
12use crate::backend::storage::error::StoreError;
13
14impl FileStore {
15    /// Write data at a specific offset.
16    /// The offset must be <= current file size.
17    #[allow(dead_code)]
18    pub fn write_at(
19        &self,
20        zone_id: &str,
21        name: &str,
22        offset: i64,
23        data: &[u8],
24    ) -> Result<(), StoreError> {
25        if data.is_empty() {
26            return Ok(());
27        }
28
29        let key = (zone_id.to_string(), name.to_string());
30        let now = Self::now_ms();
31
32        let file = self.stat(zone_id, name)?.ok_or(StoreError::NotFound)?;
33        if offset > file.size {
34            return Err(StoreError::Other(format!(
35                "offset {} exceeds file size {}",
36                offset, file.size
37            )));
38        }
39
40        let new_size = std::cmp::max(file.size, offset + data.len() as i64);
41        let pds = PART_DATA_SIZE as i64;
42
43        // Handle circular file data truncation
44        let (actual_offset, actual_data) = if file.opts.circular && file.opts.maxsize > 0 {
45            let start_cir_offset = new_size - file.opts.maxsize;
46            if start_cir_offset > 0 {
47                let end = offset + data.len() as i64;
48                if end <= start_cir_offset {
49                    // Entire write is before the circular window — no-op
50                    return Ok(());
51                }
52                if offset < start_cir_offset {
53                    let skip = (start_cir_offset - offset) as usize;
54                    (start_cir_offset, &data[skip..])
55                } else {
56                    (offset, data)
57                }
58            } else {
59                (offset, data)
60            }
61        } else {
62            (offset, data)
63        };
64
65        // Compute affected parts
66        let start_part = (actual_offset / pds) as i32;
67        let end_part = ((actual_offset + actual_data.len() as i64 - 1) / pds) as i32;
68
69        let conn = self.conn.lock().unwrap();
70        let mut data_pos = 0usize;
71
72        for part_idx in start_part..=end_part {
73            let part_start = part_idx as i64 * pds;
74            let offset_in_part = if part_idx == start_part {
75                (actual_offset - part_start) as usize
76            } else {
77                0
78            };
79
80            // Load existing part if needed
81            let existing: Option<Vec<u8>> = conn
82                .query_row(
83                    "SELECT data FROM db_file_data WHERE zoneid = ?1 AND name = ?2 AND partidx = ?3",
84                    params![zone_id, name, part_idx],
85                    |row| row.get(0),
86                )
87                .ok();
88
89            let mut part_data = existing.unwrap_or_default();
90            // Ensure part is large enough
91            if part_data.len() < offset_in_part {
92                part_data.resize(offset_in_part, 0);
93            }
94
95            // Copy data into part
96            let remaining = actual_data.len() - data_pos;
97            let space = PART_DATA_SIZE - offset_in_part;
98            let to_copy = remaining.min(space);
99
100            if offset_in_part < part_data.len() {
101                // Overwrite existing bytes
102                let overwrite_end = (offset_in_part + to_copy).min(part_data.len());
103                let overwrite_len = overwrite_end - offset_in_part;
104                part_data[offset_in_part..offset_in_part + overwrite_len]
105                    .copy_from_slice(&actual_data[data_pos..data_pos + overwrite_len]);
106                if to_copy > overwrite_len {
107                    part_data.extend_from_slice(
108                        &actual_data[data_pos + overwrite_len..data_pos + to_copy],
109                    );
110                }
111            } else {
112                part_data.extend_from_slice(&actual_data[data_pos..data_pos + to_copy]);
113            }
114
115            conn.execute(
116                "REPLACE INTO db_file_data (zoneid, name, partidx, data) VALUES (?1, ?2, ?3, ?4)",
117                params![zone_id, name, part_idx, part_data],
118            )?;
119            data_pos += to_copy;
120        }
121
122        // Update file size
123        conn.execute(
124            "UPDATE db_wave_file SET size = ?1, modts = ?2 WHERE zoneid = ?3 AND name = ?4",
125            params![new_size, now, zone_id, name],
126        )?;
127        drop(conn);
128
129        // Update cache
130        let mut cache = self.cache.lock().unwrap();
131        if let Some(entry) = cache.get_mut(&key) {
132            if let Some(ref mut f) = entry.file {
133                f.size = new_size;
134                f.modts = now;
135            }
136            // Invalidate cached data entries for affected parts
137            for part_idx in start_part..=end_part {
138                entry.data_entries.remove(&part_idx);
139            }
140        }
141
142        Ok(())
143    }
144
145    /// Read data at a specific offset and size.
146    /// For circular files, adjusts offset if it falls before valid data range.
147    /// Returns (adjusted_offset, data).
148    pub fn read_at(
149        &self,
150        zone_id: &str,
151        name: &str,
152        offset: i64,
153        size: i64,
154    ) -> Result<(i64, Vec<u8>), StoreError> {
155        let file = self
156            .stat(zone_id, name)?
157            .ok_or(StoreError::NotFound)?;
158
159        if file.size == 0 {
160            return Ok((0, Vec::new()));
161        }
162
163        let data_len = file.data_length();
164        let data_start = file.data_start_idx();
165
166        // Adjust offset for circular files
167        let mut actual_offset = offset;
168        let mut actual_size = if size == 0 { data_len } else { size };
169
170        if file.opts.circular && file.opts.maxsize > 0 {
171            if actual_offset < data_start {
172                let skip = data_start - actual_offset;
173                actual_offset = data_start;
174                actual_size -= skip;
175            }
176            if actual_size <= 0 {
177                return Ok((data_start, Vec::new()));
178            }
179        }
180
181        // Clamp to available data
182        if actual_offset >= file.size {
183            return Ok((actual_offset, Vec::new()));
184        }
185        let available = file.size - actual_offset;
186        actual_size = actual_size.min(available);
187
188        if actual_size <= 0 {
189            return Ok((actual_offset, Vec::new()));
190        }
191
192        let pds = PART_DATA_SIZE as i64;
193        let start_part = (actual_offset / pds) as i32;
194        let end_part = ((actual_offset + actual_size - 1) / pds) as i32;
195
196        // Load parts from DB
197        let conn = self.conn.lock().unwrap();
198        let mut parts_map: HashMap<i32, Vec<u8>> = HashMap::new();
199        for part_idx in start_part..=end_part {
200            if let Ok(data) = conn.query_row(
201                "SELECT data FROM db_file_data WHERE zoneid = ?1 AND name = ?2 AND partidx = ?3",
202                params![zone_id, name, part_idx],
203                |row| row.get::<_, Vec<u8>>(0),
204            ) {
205                parts_map.insert(part_idx, data);
206            }
207        }
208        drop(conn);
209
210        // Assemble result
211        let mut result = Vec::with_capacity(actual_size as usize);
212        for part_idx in start_part..=end_part {
213            if let Some(part_data) = parts_map.get(&part_idx) {
214                let part_start = part_idx as i64 * pds;
215                let skip = if part_start < actual_offset {
216                    (actual_offset - part_start) as usize
217                } else {
218                    0
219                };
220                let remaining = actual_size as usize - result.len();
221                let take = remaining.min(part_data.len().saturating_sub(skip));
222                if take > 0 {
223                    result.extend_from_slice(&part_data[skip..skip + take]);
224                }
225            }
226        }
227
228        Ok((actual_offset, result))
229    }
230}